Imports


In [1]:
%pylab inline

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from future.builtins import (bytes, str, open, super, range,
                      zip, round, input, int, pow, object)

if sys.version_info.major == 2:
    # in Python 2 cPickle is much faster than pickle but doesn't deal w/ unicode
    import cPickle as pickle
else:
    # Python 3 loads the faster pickle by default if it's available
    import pickle

import collections
import random
import time


Populating the interactive namespace from numpy and matplotlib

SparkContext Info


In [2]:
from utils import header
print(header.create_header(sc))


Test Name                              No
Machine            c1-master.ec2.internal
Date                          19 Feb 2015
Start Time                       16:19:43

Spark Configuration
========================================================================
spark.executor.extraLibraryPath
    /opt/cloudera/parcels/CDH-5.3.1- .. 5.3.1.p0.5/lib/hadoop/lib/native
spark.executor.memory
    2g
spark.driver.extraLibraryPath
    /opt/cloudera/parcels/CDH-5.3.1- .. 5.3.1.p0.5/lib/hadoop/lib/native
spark.executor.instances
    8
spark.serializer.objectStreamReset
    100
spark.eventLog.enabled
    true
spark.yarn.historyServer.address
    http://c1-master.ec2.internal:18088
spark.cores.max
    8
spark.rdd.compress
    True
spark.app.name
    PySparkShell
spark.eventLog.dir
    hdfs://c1-master.ec2.internal:8020/user/spark/applicationHistory
spark.master
    yarn-client
========================================================================


Filter Functions for RDDs


In [3]:
def is_ob(ob):
    return "STN" not in ob

def is_station(station_id, ob):
    return station_id in ob

def obs_by_station(obs_rdd):
    """ Given an RDD of observations from GSOD returns a dictionary of the observations
    the key is the station id and the value is an array of the observations for that
    station.
    
    """
    stations = obs_rdd.map(lambda line: (line.split()[0], 1))\
                      .reduceByKey(lambda x, y: x + y)\
                      .collect()
    
    station_obs = {}
    for station in stations:
        station_id = str(station[0])
        station_obs[station_id] = \
            obs_rdd.filter(lambda line: is_station(station_id, line))\
                   .collect()
        
    return station_obs

def obs_by_year(years):
    """ Given a list of years returns an ordered dictionary of the years with a value of
    a dictionary of station_ids with the value of observations.
    
    """
    years_of_obs = collections.OrderedDict()
    for year in years:
        obs_rdd = sc.textFile("/user/schiefjm/weather/gsod/" + str(year))\
                    .filter(lambda line: is_ob(line))
        obs_dict = collections.OrderedDict(obs_by_station(obs_rdd))
        years_of_obs[year] = obs_dict
    
    return years_of_obs

In [4]:
structured_obs = obs_by_year([year for year in range(1929, 1930)])
sys.getsizeof(structured_obs)


Out[4]:
280

In [5]:
persistent_stations = set()

years = [key for key in structured_obs]
for year in years:
    for key in structured_obs[year]:
        persistent_stations.add(key)

print(len(persistent_stations))
for station in sorted(persistent_stations):
    # print(station)
    pass


21

In [6]:
for year in range(1929, 1933):
    stations = sc.textFile("/user/schiefjm/weather/gsod/" + str(year))\
                 .filter(lambda line: "STN" not in line)\
                 .map(lambda line: (line.split()[0], 1))\
                 .reduceByKey(lambda x, y: x + y)\
                 .collect()
    
    print(year, len(set(stations)))


1929 21
1930 23
1931 31
1932 39